En esta página, se describen los flujos de cambios en Spanner para las bases de datos con dialecto de GoogleSQL y las bases de datos con dialecto de PostgreSQL, incluidos los siguientes temas:
- El modelo de partición basado en divisiones
- El formato y el contenido de los registros de flujos de cambios
- La sintaxis de bajo nivel que se usa para consultar esos registros
- Ejemplo del flujo de trabajo de la búsqueda
Usas la API de Spanner para consultar flujos de cambios directamente. Las aplicaciones que, en cambio, usan Dataflow para leer datos de flujos de cambios no necesitan trabajar directamente con el modelo de datos que se describe aquí.
Para obtener una guía introductoria más amplia sobre los flujos de cambios, consulta Descripción general de los flujos de cambios.
Particiones de flujos de cambios
Cuando se produce un cambio en una tabla que supervisa un flujo de cambios, Spanner escribe un registro de flujo de cambios correspondiente en la base de datos de forma síncrona en la misma transacción que el cambio de datos. Esto significa que, si la transacción se realiza correctamente, Spanner también capturó y persistió el cambio de forma correcta. Internamente, Spanner ubica el registro del flujo de cambios y el cambio de datos en el mismo lugar para que los procese el mismo servidor y, así, minimizar la sobrecarga de escritura.
Como parte del DML en una división en particular, Spanner agrega la escritura a la división de datos del flujo de cambios correspondiente en la misma transacción. Debido a esta ubicación conjunta, los flujos de cambios no agregan coordinación adicional entre los recursos de procesamiento, lo que minimiza la sobrecarga de confirmación de transacciones.
Spanner se ajusta dinámicamente dividiendo y combinando datos según la carga y el tamaño de la base de datos, y distribuyendo las divisiones entre los recursos de procesamiento.
Para permitir que las escrituras y lecturas de flujos de cambios se escalen, Spanner divide y combina el almacenamiento interno de flujos de cambios junto con los datos de la base de datos, lo que evita automáticamente los puntos calientes. Para admitir la lectura de registros de flujos de cambios casi en tiempo real a medida que se escalan las escrituras de la base de datos, la API de Spanner está diseñada para que se pueda consultar un flujo de cambios de forma simultánea con particiones de flujos de cambios. Las particiones de flujos de cambios se asignan a divisiones de datos de flujos de cambios que contienen los registros de flujos de cambios. Las particiones de un flujo de cambios cambian de forma dinámica con el tiempo y se correlacionan con la forma en que Spanner divide y combina dinámicamente los datos de la base de datos.
Una partición de flujo de cambios contiene registros para un rango de claves inmutable durante un período específico. Cualquier partición de flujo de cambios se puede dividir en una o más particiones de flujo de cambios, o bien se puede combinar con otras particiones de flujo de cambios. Cuando se producen estos eventos de división o combinación, se crean particiones secundarias para capturar los cambios en sus respectivos rangos de claves inmutables para el siguiente período. Además de los registros de cambios de datos, una consulta de transmisión de cambios devuelve registros de particiones secundarias para notificar a los lectores sobre las nuevas particiones de transmisión de cambios que se deben consultar, así como registros de latidos para indicar el progreso cuando no se realizaron escrituras recientemente.
Cuando se consulta una partición de transmisión de cambios en particular, los registros de cambios se devuelven en orden de marca de tiempo de confirmación. Cada registro de cambio se devuelve exactamente una vez. No se garantiza el orden de los registros de cambios en las particiones de flujos de cambios. Los registros de cambios para una clave primaria determinada solo se devuelven en una partición para un período específico.
Debido al linaje de partición principal-secundaria, para procesar los cambios de una clave en particular en orden de marca de tiempo de confirmación, los registros que se devuelven de las particiones secundarias solo se deben procesar después de que se hayan procesado los registros de todas las particiones principales.
Funciones de lectura de flujo de cambios y sintaxis de consultas
GoogleSQL
Para consultar flujos de cambios, usa la API de ExecuteStreamingSql. Spanner crea automáticamente una función de lectura especial junto con el flujo de cambios. La función de lectura proporciona acceso a los registros del flujo de cambios. La convención de nomenclatura de la función de lectura es READ_change_stream_name.
Suponiendo que existe un flujo de cambios SingersNameStream en la base de datos, la sintaxis de consulta para GoogleSQL es la siguiente:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
La función de lectura acepta los siguientes argumentos:
| Nombre del argumento | Tipo | ¿Es obligatorio? | Descripción |
|---|---|---|---|
start_timestamp |
TIMESTAMP |
Obligatorio | Especifica que se deben devolver los registros con commit_timestamp mayor o igual que start_timestamp. El valor debe estar dentro del período de retención del flujo de cambios, ser menor o igual que la hora actual y mayor o igual que la marca de tiempo de la creación del flujo de cambios. |
end_timestamp |
TIMESTAMP |
Opcional (predeterminado: NULL) |
Especifica que se deben devolver los registros con un valor de commit_timestamp menor o igual que end_timestamp. El valor debe estar dentro del período de retención del flujo de cambios y ser mayor o igual que start_timestamp. La consulta finaliza después de devolver todos los ChangeRecords hasta el end_timestamp o cuando terminas la conexión. Si end_timestamp se establece en NULL o no se especifica, la consulta continúa ejecutándose hasta que se devuelven todos los ChangeRecords o hasta que finalizas la conexión. |
partition_token |
STRING |
Opcional (predeterminado: NULL) |
Especifica qué partición del flujo de cambios se debe consultar, según el contenido de los registros de particiones secundarias. Si se establece en NULL o no se especifica, significa que el lector está consultando el flujo de cambios por primera vez y no obtuvo ningún token de partición específico para consultar. |
heartbeat_milliseconds |
INT64 |
Obligatorio | Determina con qué frecuencia se devuelve un latido ChangeRecord en caso de que no se confirmen transacciones en esta partición.
El valor debe estar entre 1,000 (un segundo) y
300,000 (cinco minutos). |
read_options |
ARRAY |
Opcional (predeterminado: NULL) |
Agrega opciones de lectura reservadas para uso futuro. El único valor permitido es NULL. |
Te recomendamos que crees un método auxiliar para compilar el texto de la consulta de la función de lectura y vincularle parámetros, como se muestra en el siguiente ejemplo.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Para consultar flujos de cambios, usa la API de ExecuteStreamingSql. Spanner crea automáticamente una función de lectura especial junto con el flujo de cambios. La función de lectura proporciona acceso a los registros del flujo de cambios. La convención de nomenclatura de la función de lectura es spanner.read_json_change_stream_name.
Suponiendo que existe un flujo de cambios SingersNameStream en la base de datos, la sintaxis de la consulta para PostgreSQL es la siguiente:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
La función de lectura acepta los siguientes argumentos:
| Nombre del argumento | Tipo | ¿Es obligatorio? | Descripción |
|---|---|---|---|
start_timestamp |
timestamp with time zone |
Obligatorio | Especifica que se deben devolver los registros de cambios con commit_timestamp mayor o igual que start_timestamp. El valor debe estar dentro del período de retención del flujo de cambios, ser inferior o igual a la hora actual y ser superior o igual a la marca de tiempo de la creación del flujo de cambios. |
end_timestamp |
timestamp with timezone |
Opcional (predeterminado: NULL) |
Especifica que se deben devolver los registros de cambios con commit_timestamp menor o igual que end_timestamp. El valor debe estar dentro del período de retención del flujo de cambios y ser mayor o igual que start_timestamp.
La consulta finaliza después de devolver todos los registros de cambios hasta el end_timestamp o hasta que finalices la conexión.
Si es NULL, la consulta continúa ejecutándose hasta que se devuelven todos los registros de cambios o hasta que finalizas la conexión. |
partition_token |
text |
Opcional (predeterminado: NULL) |
Especifica qué partición del flujo de cambios se debe consultar, según el contenido de los registros de particiones secundarias. Si se establece en NULL o no se especifica, significa que el lector está consultando el flujo de cambios por primera vez y no obtuvo ningún token de partición específico para consultar. |
heartbeat_milliseconds |
bigint |
Obligatorio | Determina con qué frecuencia se devuelve un latido ChangeRecord cuando no hay transacciones confirmadas en esta partición.
El valor debe estar entre 1,000 (un segundo) y 300,000 (cinco minutos). |
null |
null |
Obligatorio | Reservado para uso futuro |
Te recomendamos que crees un método auxiliar para compilar el texto de la función de lectura y vincular parámetros a él, como se muestra en el siguiente ejemplo.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Formato de registro de los flujos de cambios
GoogleSQL
La función de lectura de flujos de cambios devuelve una sola columna ChangeRecord de tipo ARRAY<STRUCT<...>>. En cada fila, este array siempre contiene un solo elemento.
Los elementos del array tienen el siguiente tipo:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
Este STRUCT tiene tres campos: data_change_record, heartbeat_record y child_partitions_record, cada uno de tipo ARRAY<STRUCT<...>>. En cualquier fila que devuelva la función de lectura de transmisiones de cambios, solo uno de estos tres campos contiene un valor; los otros dos están vacíos o son NULL. Estos campos de array contienen, como máximo, un elemento.
En las siguientes secciones, se examina cada uno de estos tres tipos de registros.
PostgreSQL
La función de lectura de flujos de cambios devuelve una sola columna ChangeRecord de tipo JSON con la siguiente estructura:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Este objeto tiene tres claves posibles: data_change_record, heartbeat_record y child_partitions_record. El tipo de valor correspondiente es JSON. En cualquier fila que devuelva la función de lectura del flujo de cambios, solo existirá una de estas tres claves.
En las siguientes secciones, se examina cada uno de estos tres tipos de registros.
Registros de cambios de datos
Un registro de cambio de datos contiene un conjunto de cambios en una tabla con el mismo tipo de modificación (inserción, actualización o eliminación) confirmados en la misma marca de tiempo de confirmación en una partición de flujo de cambios para la misma transacción. Se pueden devolver varios registros de cambio de datos para la misma transacción en varias particiones del flujo de cambios.
Todos los registros de cambios de datos tienen los campos commit_timestamp, server_transaction_id y record_sequence, que juntos determinan el orden en el flujo de cambios para un registro de flujo. Estos tres campos son suficientes para derivar el orden de los cambios y proporcionar coherencia externa.
Ten en cuenta que varias transacciones pueden tener la misma marca de tiempo de confirmación si afectan datos que no se superponen. El campo server_transaction_id ofrece la capacidad de distinguir qué conjunto de cambios (potencialmente en todas las particiones del flujo de cambios) se emitieron dentro de la misma transacción. Si lo combinas con los campos record_sequence y number_of_records_in_transaction, también puedes almacenar en búfer y ordenar todos los registros de una transacción en particular.
Los campos de un registro de cambio de datos incluyen lo siguiente:
GoogleSQL
| Campo | Tipo | Descripción |
|---|---|---|
commit_timestamp |
TIMESTAMP |
Indica la marca de tiempo en la que se confirmó el cambio. |
record_sequence |
STRING |
Indica el número de secuencia del registro dentro de la transacción.
Los números de secuencia son únicos y aumentan de forma monótona (pero no necesariamente contigua) dentro de una transacción. Ordena los registros del mismo server_transaction_id por record_sequence para reconstruir el orden de los cambios dentro de la transacción.
Spanner podría optimizar este orden para obtener un mejor rendimiento, y es posible que no siempre coincida con el orden original que proporcionas. |
server_transaction_id |
STRING |
Proporciona una cadena única a nivel global que representa la transacción en la que se confirmó el cambio. El valor solo se debe usar en el contexto del procesamiento de registros de transmisión de cambios y no se correlaciona con el ID de transacción en la API de Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indica si este es el último registro de una transacción en la partición actual. |
table_name |
STRING |
Es el nombre de la tabla afectada por el cambio. |
value_capture_type |
STRING |
Describe el tipo de captura de valor que se especificó en la configuración del flujo de cambios cuando se capturó este cambio. El tipo de captura de valor puede ser uno de los siguientes:
De forma predeterminada, es |
column_types |
[
{
"name": "STRING",
"type": {
"code": "STRING"
},
"is_primary_key": BOOLEAN
"ordinal_position": NUMBER
},
...
] |
Indica el nombre de la columna, el tipo de columna, si es una clave primaria y la posición de la columna tal como se define en el esquema (ordinal_position). La primera columna de una tabla en el esquema tendría una posición ordinal de 1. El tipo de columna puede estar anidado para las columnas de array. El formato coincide con la estructura de tipo que se describe en la referencia de la API de Spanner.
|
mods |
[
{
"keys": {"STRING" : "STRING"},
"new_values": {
"STRING" : "VALUE-TYPE",
[...]
},
"old_values": {
"STRING" : "VALUE-TYPE",
[...]
},
},
[...]
]
|
Describe los cambios que se realizaron, incluidos los valores de la clave primaria, los valores anteriores y los valores nuevos de las columnas modificadas o rastreadas.
La disponibilidad y el contenido de los valores antiguos y nuevos dependen del value_capture_type configurado. Los campos new_values y old_values solo contienen las columnas sin clave. |
mod_type |
STRING |
Describe el tipo de cambio. Es uno de los siguientes: INSERT,
UPDATE o DELETE. |
number_of_records_in_transaction |
INT64 |
Indica la cantidad de registros de cambios de datos que forman parte de esta transacción en todas las particiones de transmisión de cambios. |
number_of_partitions_in_transaction |
INT64 |
Indica la cantidad de particiones que muestran registros de cambios de datos para esta transacción. |
transaction_tag |
STRING |
Indica la etiqueta de transacción asociada con esta transacción. |
is_system_transaction |
BOOL |
Indica si la transacción es del sistema. |
PostgreSQL
| Campo | Tipo | Descripción |
|---|---|---|
commit_timestamp |
STRING |
Indica la marca de tiempo en la que se confirmó el cambio. |
record_sequence |
STRING |
Indica el número de secuencia del registro dentro de la transacción.
Los números de secuencia son únicos y aumentan de forma monótona (pero no necesariamente contigua) dentro de una transacción. Ordena los registros del mismo server_transaction_id por record_sequence para reconstruir el orden de los cambios dentro de la transacción. |
server_transaction_id |
STRING |
Proporciona una cadena única a nivel global que representa la transacción en la que se confirmó el cambio. El valor solo se debe usar en el contexto del procesamiento de registros de transmisión de cambios y no se correlaciona con el ID de transacción en la API de Spanner. |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indica si este es el último registro de una transacción en la partición actual. |
table_name |
STRING |
Indica el nombre de la tabla afectada por el cambio. |
value_capture_type |
STRING |
Describe el tipo de captura de valor que se especificó en la configuración del flujo de cambios cuando se capturó este cambio. El tipo de captura de valor puede ser uno de los siguientes:
De forma predeterminada, es |
column_types |
[
{
"name": "STRING",
"type": {
"code": "STRING"
},
"is_primary_key": BOOLEAN
"ordinal_position": NUMBER
},
...
] |
Indica el nombre de la columna, el tipo de columna, si es una clave primaria y la posición de la columna tal como se define en el esquema (ordinal_position). La primera columna de una tabla en el esquema tendría una posición ordinal de 1. El tipo de columna puede estar anidado para las columnas de array. El formato coincide con la estructura de tipo que se describe en la referencia de la API de Spanner.
|
mods |
[
{
"keys": {"STRING" : "STRING"},
"new_values": {
"STRING" : "VALUE-TYPE",
[...]
},
"old_values": {
"STRING" : "VALUE-TYPE",
[...]
},
},
[...]
]
|
Describe los cambios que se realizaron, incluidos los valores de clave primaria, los valores anteriores y los valores nuevos de las columnas modificadas o rastreadas. La disponibilidad y el contenido de los valores antiguos y nuevos dependen del value_capture_type configurado. Los campos new_values y old_values solo contienen las columnas que no son clave.
|
mod_type |
STRING |
Describe el tipo de cambio. Es uno de los siguientes: INSERT,
UPDATE o DELETE. |
number_of_records_in_transaction |
INT64 |
Indica la cantidad de registros de cambios de datos que forman parte de esta transacción en todas las particiones de transmisión de cambios. |
number_of_partitions_in_transaction |
NUMBER |
Indica la cantidad de particiones que muestran registros de cambios de datos para esta transacción. |
transaction_tag |
STRING |
Indica la etiqueta de transacción asociada con esta transacción. |
is_system_transaction |
BOOLEAN |
Indica si la transacción es del sistema. |
Ejemplo de registro de cambio de datos
A continuación, se muestra un par de registros de cambios de datos de ejemplo. Describen una sola transacción en la que se realiza una transferencia entre dos cuentas. Las dos cuentas se encuentran en particiones de transmisiones de cambios separadas.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
El siguiente registro de cambio de datos es un ejemplo de un registro con el tipo de captura de valores NEW_VALUES. Ten en cuenta que solo se propagan los valores nuevos.
Solo se modificó la columna LastUpdate, por lo que solo se devolvió esa columna.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
El siguiente registro de cambio de datos es un ejemplo de un registro con el tipo de captura de valores NEW_ROW. Solo se modificó la columna LastUpdate, pero se muestran todas las columnas rastreadas.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
El siguiente registro de cambio de datos es un ejemplo de un registro con el tipo de captura de valores NEW_ROW_AND_OLD_VALUES. Solo se modificó la columna LastUpdate, pero se muestran todas las columnas rastreadas. Este tipo de captura de valor registra el valor nuevo y el valor anterior de LastUpdate.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Registros de latidos
Cuando se devuelve un registro de latido, indica que se devolvieron todos los cambios con commit_timestamp menor o igual que el timestamp del registro de latido, y los registros de datos futuros en esta partición deben tener marcas de tiempo de confirmación más altas que las que devuelve el registro de latido. Los registros de latidos se devuelven cuando no se escriben cambios de datos en una partición. Cuando se escriben cambios de datos en la partición, se puede usar data_change_record.commit_timestamp en lugar de heartbeat_record.timestamp para indicar que el lector está avanzando en la lectura de la partición.
Puedes usar los registros de la señal de monitoreo de funcionamiento que se devuelven en las particiones para sincronizar los lectores en todas las particiones. Una vez que todos los lectores hayan recibido un latido mayor o igual a alguna marca de tiempo A, o bien hayan recibido datos o registros de partición secundaria mayores o iguales a la marca de tiempo A, los lectores sabrán que recibieron todos los registros confirmados en esa marca de tiempo A o antes, y podrán comenzar a procesar los registros almacenados en búfer, por ejemplo, ordenar los registros entre particiones por marca de tiempo y agruparlos por server_transaction_id.
Un registro de latido contiene solo un campo:
GoogleSQL
| Campo | Tipo | Descripción |
|---|---|---|
timestamp |
TIMESTAMP |
Indica la marca de tiempo del registro de señal de monitoreo de funcionamiento. |
PostgreSQL
| Campo | Tipo | Descripción |
|---|---|---|
timestamp |
STRING |
Indica la marca de tiempo del registro de señal de monitoreo de funcionamiento. |
Ejemplo de registro de señal de monitoreo de funcionamiento
Ejemplo de un registro de latido que comunica que se devolvieron todos los registros con marcas de tiempo menores o iguales que la marca de tiempo de este registro:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Registros de particiones secundarias
Los registros de particiones secundarias devuelven información sobre las particiones secundarias: sus tokens de partición, los tokens de sus particiones principales y el start_timestamp que representa la marca de tiempo más antigua para la que las particiones secundarias contienen registros de cambios. Los registros cuyas marcas de tiempo de confirmación son inmediatamente anteriores a child_partitions_record.start_timestamp se devuelven en la partición actual. Después de devolver todos los registros de partición secundaria para esta partición, esta consulta se devuelve con un estado de éxito, lo que indica que se devolvieron todos los registros para esta partición.
Los campos de un registro de partición secundaria incluyen lo siguiente:
GoogleSQL
| Campo | Tipo | Descripción |
|---|---|---|
start_timestamp |
TIMESTAMP |
Indica que los registros de cambio de datos que se devolvieron de las particiones secundarias en este registro de partición secundaria tienen una marca de tiempo de confirmación mayor o igual que start_timestamp. Cuando se consulta una partición secundaria, la consulta debe especificar el token de partición secundaria y un start_timestamp mayor o igual que child_partitions_token.start_timestamp. Todos los registros de particiones secundarias que devuelve una partición tienen el mismo start_timestamp, y la marca de tiempo siempre se encuentra entre el start_timestamp y el end_timestamp especificados de la consulta. |
record_sequence |
STRING |
Indica un número de secuencia que aumenta de forma monótona y que se puede usar para definir el orden de los registros de partición secundaria cuando se devuelven varios registros de partición secundaria con el mismo start_timestamp en una partición determinada. El token de partición, start_timestamp y record_sequence, identifica de forma única un registro de partición secundaria.
|
child_partitions |
[
{
"token" : "STRING",
"parent_partition_tokens" : ["STRING"]
}
] |
Devuelve un conjunto de particiones secundarias y su información asociada. Esto incluye la cadena de token de partición que se usa para identificar la partición secundaria en las búsquedas, así como los tokens de sus particiones principales. |
PostgreSQL
| Campo | Tipo | Descripción |
|---|---|---|
start_timestamp |
STRING |
Indica que los registros de cambio de datos que se devolvieron de las particiones secundarias en este registro de partición secundaria tienen una marca de tiempo de confirmación mayor o igual que start_timestamp. Cuando se consulta una partición secundaria, la consulta debe especificar el token de partición secundaria y un start_timestamp mayor o igual que child_partitions_token.start_timestamp. Todos los registros de particiones secundarias que devuelve una partición tienen el mismo start_timestamp, y la marca de tiempo siempre se encuentra entre el start_timestamp y el end_timestamp especificados de la consulta.
|
record_sequence |
STRING |
Indica un número de secuencia que aumenta de forma monótona y que se puede usar para definir el orden de los registros de partición secundaria cuando se devuelven varios registros de partición secundaria con el mismo start_timestamp en una partición determinada. El token de partición, start_timestamp y record_sequence, identifica de forma única un registro de partición secundaria.
|
child_partitions |
[
{
"token": "STRING",
"parent_partition_tokens": ["STRING"],
}, [...]
] |
Devuelve un array de las particiones secundarias y su información asociada. Esto incluye la cadena de token de partición que se usa para identificar la partición secundaria en las consultas, así como los tokens de sus particiones principales. |
Ejemplo de registro de partición secundaria
A continuación, se muestra un ejemplo de un registro de partición secundaria:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Flujo de trabajo de consulta de flujos de cambios
Ejecuta consultas de flujo de cambios con la API de ExecuteStreamingSql, con una transacción de solo lectura de un solo uso y un límite de marca de tiempo sólido. La función de lectura de transmisiones de cambios te permite especificar start_timestamp y end_timestamp para el período que te interesa. Se puede acceder a todos los registros de cambios dentro del período de retención con el límite de marca de tiempo de solo lectura sólido.
Todos los demás TransactionOptions no son válidos para las consultas de transmisiones de cambios. Además, si TransactionOptions.read_only.return_read_timestamp se establece en true, se devuelve un valor especial de kint64max - 1 en el mensaje Transaction que describe la transacción, en lugar de una marca de tiempo de lectura válida. Este valor especial se debe descartar y no se debe usar para ninguna consulta posterior.
Cada consulta de flujo de cambios puede devolver cualquier cantidad de filas, cada una de las cuales contiene un registro de cambio de datos, un registro de latido o un registro de particiones secundarias. No es necesario establecer una fecha límite para la solicitud.
Ejemplo de flujo de trabajo de consulta de flujo de cambios
El flujo de trabajo de la consulta de transmisión comienza con la emisión de la primera consulta de transmisión de cambios especificando partition_token a NULL. La consulta debe especificar la función de lectura para el flujo de cambios, la marca de tiempo de inicio y finalización de interés, y el intervalo de latidos. Cuando end_timestamp es NULL, la consulta sigue devolviendo cambios de datos hasta que finaliza la partición.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Procesa los registros de datos de esta consulta hasta que se devuelvan todos los registros de particiones secundarias. En el siguiente ejemplo, se muestran dos registros de partición secundaria y tres tokens de partición, y, luego, finaliza la consulta. Los registros de partición secundaria de una consulta específica siempre comparten el mismo start_timestamp.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012390",
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Para procesar los cambios después de 2022-05-01T09:00:01Z, crea tres consultas nuevas y ejecútalas en paralelo. Cuando se usan en conjunto, las tres consultas devuelven los cambios de datos para el mismo rango de claves que abarca su elemento superior. Siempre establece start_timestamp en el mismo start_timestamp del registro de partición secundaria y usa el mismo end_timestamp y el mismo intervalo de latidos para procesar los registros de manera coherente en todas las consultas.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
La consulta en child_token_2 finaliza después de devolver otro registro de partición secundaria. Este registro indica que una nueva partición cubre los cambios para child_token_2 y child_token_3 a partir del 2022-05-01T09:30:15Z. La consulta en child_token_3 devuelve el mismo registro exacto, ya que ambos son las particiones principales del nuevo child_token_4. Para garantizar un procesamiento estrictamente ordenado de los registros de datos para una clave en particular, la consulta en child_token_4 debe comenzar después de que hayan finalizado todos los elementos superiores. En este caso, los elementos superiores son child_token_2 y child_token_3. Solo crea una consulta para cada token de partición secundaria. El diseño del flujo de trabajo de la consulta debe designar un elemento principal para esperar y programar la consulta en child_token_4.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": ["child_token_2", "child_token_3"],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Encuentra ejemplos de cómo controlar y analizar registros de transmisiones de cambios en el conector de Dataflow de SpannerIO de Apache Beam en GitHub.