Cette page décrit les flux de modifications dans Spanner pour les bases de données utilisant le dialecte GoogleSQL et celles utilisant le dialecte PostgreSQL. Elle aborde les points suivants :
- Modèle de partitionnement basé sur la division
- Format et contenu des enregistrements de flux de modifications
- Syntaxe de bas niveau utilisée pour interroger ces enregistrements
- Exemple de workflow de requête
Vous utilisez l'API Spanner pour interroger directement les flux de modifications. Les applications qui utilisent Dataflow pour lire les données du flux de modifications n'ont pas besoin de fonctionner directement avec le modèle de données décrit ici.
Pour obtenir un guide d'introduction plus général sur les flux de modifications, consultez Présentation des flux de modifications.
Modifier les partitions du flux de modifications
Lorsqu'une modification se produit dans une table surveillée par un flux de modifications, Spanner écrit un enregistrement de flux de modifications correspondant dans la base de données, de manière synchrone dans la même transaction que la modification des données. Cela signifie que si la transaction réussit, Spanner a également capturé et conservé la modification. En interne, Spanner colocalise l'enregistrement du flux de modifications et la modification des données afin qu'ils soient traités par le même serveur pour minimiser la surcharge d'écriture.
Dans le cadre de la LMD pour une fraction spécifique, Spanner ajoute l'écriture à la fraction de données du flux de modifications correspondant dans la même transaction. Grâce à cette colocation, les flux de modifications n'ajoutent pas de coordination supplémentaire entre les ressources de diffusion, ce qui minimise la surcharge de validation des transactions.
Spanner évolue en divisant et en fusionnant dynamiquement les données en fonction de la charge et de la taille de la base de données, et en distribuant les divisions entre les ressources de diffusion.
Pour permettre aux écritures et aux lectures de flux de modifications de s'adapter à la charge, Spanner divise et fusionne le stockage interne des flux de modifications avec les données de la base de données, en évitant automatiquement les points chauds. Pour permettre la lecture des enregistrements de flux de modifications en temps quasi réel à mesure que les écritures de base de données évoluent, l'API Spanner est conçue pour qu'un flux de modifications puisse être interrogé simultanément à l'aide de partitions de flux de modifications. Les partitions de flux de modifications sont mappées aux divisions de données de flux de modifications qui contiennent les enregistrements de flux de modifications. Les partitions d'un flux de modifications changent de manière dynamique au fil du temps et sont corrélées à la façon dont Spanner divise et fusionne dynamiquement les données de la base de données.
Une partition de flux de modifications contient des enregistrements pour une plage de clés immuable pour une plage de temps spécifique. Toute partition de flux de modifications peut être divisée en une ou plusieurs partitions de flux de modifications, ou être fusionnée avec d'autres partitions de flux de modifications. Lorsque ces événements de fractionnement ou de fusion se produisent, des partitions enfants sont créées pour enregistrer les modifications apportées à leurs plages de clés immuables respectives pour la période suivante. En plus des enregistrements de modification des données, une requête de flux de modifications renvoie des enregistrements de partition enfant pour informer les lecteurs des nouvelles partitions de flux de modifications à interroger, ainsi que des enregistrements de signal de présence pour indiquer la progression lorsque aucune écriture n'a eu lieu récemment.
Lorsque vous interrogez une partition de flux de modifications spécifique, les enregistrements de modifications sont renvoyés dans l'ordre de l'horodatage de commit. Chaque enregistrement de modification est renvoyé une seule fois. L'ordre des enregistrements de modifications n'est pas garanti dans les partitions de flux de modifications. Les enregistrements de modifications pour une clé primaire spécifique ne sont renvoyés que sur une partition pour une plage de temps spécifique.
En raison de la lignée de partition parent-enfant, pour traiter les modifications d'une clé particulière dans l'ordre des codes temporels de commit, les enregistrements renvoyés par les partitions enfants ne doivent être traités qu'après le traitement des enregistrements de toutes les partitions parentes.
Fonctions de lecture du flux de modifications et syntaxe des requêtes
GoogleSQL
Pour interroger les flux de modifications, utilisez l'API ExecuteStreamingSql. Spanner crée automatiquement une fonction de lecture spéciale avec le flux de modifications. La fonction de lecture permet d'accéder aux enregistrements du flux de modifications. La convention d'attribution de noms pour la fonction de lecture est READ_change_stream_name.
En supposant qu'un flux de modifications SingersNameStream existe dans la base de données, la syntaxe de requête pour GoogleSQL est la suivante :
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
La fonction de lecture accepte les arguments suivants :
| Nom de l'argument | Type | Obligatoire ? | Description |
|---|---|---|---|
start_timestamp |
TIMESTAMP |
Obligatoire | Indique que les enregistrements dont la valeur commit_timestamp est supérieure ou égale à start_timestamp doivent être renvoyés. La valeur doit être comprise dans la période de conservation du flux de modifications, et doit être inférieure ou égale à l'heure actuelle, et supérieure ou égale au code temporel de la création du flux de modifications. |
end_timestamp |
TIMESTAMP |
Facultatif (par défaut : NULL) |
Indique que les enregistrements dont la valeur commit_timestamp est inférieure ou égale à end_timestamp doivent être renvoyés. La valeur doit être comprise dans la période de conservation du flux de modifications et être supérieure ou égale à start_timestamp. La requête se termine après avoir renvoyé tous les ChangeRecords jusqu'à end_timestamp ou lorsque vous mettez fin à la connexion. Si end_timestamp est défini sur NULL ou n'est pas spécifié, la requête continue de s'exécuter jusqu'à ce que tous les ChangeRecords soient renvoyés ou jusqu'à ce que vous mettiez fin à la connexion. |
partition_token |
STRING |
Facultatif (par défaut : NULL) |
Spécifie la partition de flux de modifications à interroger, en fonction du contenu des enregistrements des partitions enfants. Si la valeur est NULL ou n'est pas spécifiée, cela signifie que le lecteur interroge le flux de modifications pour la première fois et n'a obtenu aucun jeton de partition spécifique à partir duquel effectuer la requête. |
heartbeat_milliseconds |
INT64 |
Obligatoire | Détermine la fréquence à laquelle un signal de pulsation ChangeRecord est renvoyé en l'absence de transactions validées dans cette partition.
La valeur doit être comprise entre 1,000 (une seconde) et
300,000 (cinq minutes). |
read_options |
ARRAY |
Facultatif (par défaut : NULL) |
Ajoute des options de lecture réservées pour une utilisation ultérieure. La seule valeur autorisée est NULL. |
Nous vous recommandons de créer une méthode d'assistance pour créer le texte de la requête de la fonction de lecture et y associer des paramètres, comme indiqué dans l'exemple suivant.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Pour interroger les flux de modifications, utilisez l'API ExecuteStreamingSql. Spanner crée automatiquement une fonction de lecture spéciale avec le flux de modifications. La fonction de lecture permet d'accéder aux enregistrements du flux de modifications. La convention d'attribution de noms pour la fonction de lecture est spanner.read_json_change_stream_name.
En supposant qu'un flux de modifications SingersNameStream existe dans la base de données, la syntaxe de requête pour PostgreSQL est la suivante :
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
La fonction de lecture accepte les arguments suivants :
| Nom de l'argument | Type | Obligatoire ? | Description |
|---|---|---|---|
start_timestamp |
timestamp with time zone |
Obligatoire | Indique que les journaux des modifications dont commit_timestamp est supérieur ou égal à start_timestamp doivent être renvoyés. La valeur doit être comprise dans la période de conservation du flux de modifications, et doit être inférieure ou égale à l'heure actuelle, et supérieure ou égale au code temporel de la création du flux de modifications. |
end_timestamp |
timestamp with timezone |
Facultatif (par défaut : NULL) |
Indique que les enregistrements de modifications dont la valeur commit_timestamp est inférieure ou égale à end_timestamp doivent être renvoyés. La valeur doit être comprise dans la période de conservation du flux de modifications et être supérieure ou égale à start_timestamp.
La requête se termine après avoir renvoyé tous les enregistrements de modifications jusqu'à end_timestamp ou jusqu'à ce que vous mettiez fin à la connexion.
Si la valeur est NULL, la requête continue de s'exécuter jusqu'à ce que tous les enregistrements de modifications soient renvoyés ou jusqu'à ce que vous mettiez fin à la connexion. |
partition_token |
text |
Facultatif (par défaut : NULL) |
Spécifie la partition de flux de modifications à interroger, en fonction du contenu des enregistrements des partitions enfants. Si la valeur est NULL ou n'est pas spécifiée, cela signifie que le lecteur interroge le flux de modifications pour la première fois et n'a obtenu aucun jeton de partition spécifique à partir duquel effectuer la requête. |
heartbeat_milliseconds |
bigint |
Obligatoire | Détermine la fréquence à laquelle un signal de présence ChangeRecord est renvoyé lorsqu'aucune transaction n'est validée dans cette partition.
La valeur doit être comprise entre 1,000 (une seconde) et 300,000 (cinq minutes). |
null |
null |
Obligatoire | Réservé pour une utilisation ultérieure |
Nous vous recommandons de créer une méthode d'assistance pour créer le texte de la fonction de lecture et y associer des paramètres, comme indiqué dans l'exemple suivant.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Modifier le format d'enregistrement des flux
GoogleSQL
La fonction de lecture des flux de modifications renvoie une seule colonne ChangeRecord de type ARRAY<STRUCT<...>>. Dans chaque ligne, ce tableau contient toujours un seul élément.
Les éléments du tableau sont du type suivant :
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
Ce STRUCT comporte trois champs : data_change_record, heartbeat_record et child_partitions_record, chacun de type ARRAY<STRUCT<...>>. Dans chaque ligne renvoyée par la fonction de lecture du flux de modifications, un seul de ces trois champs contient une valeur. Les deux autres sont vides ou contiennent NULL. Ces champs de tableau contiennent au maximum un élément.
Les sections suivantes examinent chacun de ces trois types d'enregistrements.
PostgreSQL
La fonction de lecture des flux de modifications renvoie une seule colonne ChangeRecord de type JSON avec la structure suivante :
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Cet objet peut contenir trois clés : data_change_record, heartbeat_record et child_partitions_record. Le type de valeur correspondant est JSON. Dans chaque ligne renvoyée par la fonction de lecture du flux de modifications, une seule de ces trois clés existe.
Les sections suivantes examinent chacun de ces trois types d'enregistrements.
Enregistrements des modifications de données
Un enregistrement de modification de données contient un ensemble de modifications apportées à une table avec le même type de modification (insertion, mise à jour ou suppression) validé au même horodatage de validation dans une partition de flux de modifications pour la même transaction. Plusieurs enregistrements de modification de données peuvent être renvoyés pour la même transaction dans plusieurs partitions de flux de modifications.
Tous les enregistrements de modifications de données comportent les champs commit_timestamp, server_transaction_id et record_sequence, qui déterminent ensemble l'ordre dans le flux de modifications pour un enregistrement de flux. Ces trois champs suffisent à déduire l'ordre des modifications et à assurer la cohérence externe.
Notez que plusieurs transactions peuvent avoir le même code temporel de validation si elles touchent des données qui ne se chevauchent pas. Le champ server_transaction_id permet de distinguer l'ensemble des modifications (potentiellement dans les partitions du flux de modifications) qui ont été émises dans la même transaction. En l'associant aux champs record_sequence et number_of_records_in_transaction, vous pouvez également mettre en mémoire tampon et ordonner tous les enregistrements d'une transaction spécifique.
Les champs d'un enregistrement de modification des données incluent les éléments suivants :
GoogleSQL
| Champ | Type | Description |
|---|---|---|
commit_timestamp |
TIMESTAMP |
Indique le code temporel auquel la modification a été appliquée. |
record_sequence |
STRING |
Indique le numéro de séquence de l'enregistrement dans la transaction.
Les numéros de séquence sont uniques et augmentent de façon monotone (mais ne sont pas nécessairement contigus) dans une transaction. Triez les enregistrements pour le même server_transaction_id par record_sequence afin de reconstruire l'ordre des modifications au sein de la transaction.
Spanner peut optimiser cet ordre pour améliorer les performances. Il ne correspondra donc pas toujours à l'ordre d'origine que vous fournissez. |
server_transaction_id |
STRING |
Fournit une chaîne unique représentant la transaction dans laquelle la modification a été validée. La valeur ne doit être utilisée que dans le contexte du traitement des enregistrements de flux de modifications et n'est pas corrélée à l'ID de transaction dans l'API de Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indique s'il s'agit du dernier enregistrement d'une transaction dans la partition actuelle. |
table_name |
STRING |
Nom de la table concernée par la modification. |
value_capture_type |
STRING |
Décrit le type de capture de valeur spécifié dans la configuration du flux de modifications lorsque cette modification a été capturée. Le type de capture de valeur peut être l'un des suivants :
Par défaut, il s'agit de |
column_types |
[
{
"name": "STRING",
"type": {
"code": "STRING"
},
"is_primary_key": BOOLEAN
"ordinal_position": NUMBER
},
...
] |
Indique le nom de la colonne, son type, si elle est une clé primaire et sa position telle qu'elle est définie dans le schéma (ordinal_position). La première colonne d'une table du schéma aurait une position ordinale de 1. Le type de colonne peut être imbriqué pour les colonnes de tableau. Le format correspond à la structure de type décrite dans la documentation de référence de l'API Spanner.
|
mods |
[
{
"keys": {"STRING" : "STRING"},
"new_values": {
"STRING" : "VALUE-TYPE",
[...]
},
"old_values": {
"STRING" : "VALUE-TYPE",
[...]
},
},
[...]
]
|
Décrit les modifications apportées, y compris les valeurs de clé primaire, les anciennes valeurs et les nouvelles valeurs des colonnes modifiées ou suivies.
La disponibilité et le contenu des anciennes et nouvelles valeurs dépendent de la value_capture_type configurée. Les champs new_values et old_values ne contiennent que les colonnes non clés. |
mod_type |
STRING |
Décrit le type de modification. Spécifiez l'un des contrôles suivants : INSERT, UPDATE ou DELETE. |
number_of_records_in_transaction |
INT64 |
Indique le nombre d'enregistrements de modifications de données faisant partie de cette transaction dans toutes les partitions de flux de modifications. |
number_of_partitions_in_transaction |
INT64 |
Indique le nombre de partitions qui renvoient des enregistrements de modification de données pour cette transaction. |
transaction_tag |
STRING |
Indique le tag de transaction associé à cette transaction. |
is_system_transaction |
BOOL |
Indique si la transaction est une transaction système. |
PostgreSQL
| Champ | Type | Description |
|---|---|---|
commit_timestamp |
STRING |
Indique le code temporel auquel la modification a été appliquée. |
record_sequence |
STRING |
Indique le numéro de séquence de l'enregistrement dans la transaction.
Les numéros de séquence sont uniques et augmentent de façon monotone (mais ne sont pas nécessairement contigus) dans une transaction. Triez les enregistrements pour le même server_transaction_id par record_sequence afin de reconstruire l'ordre des modifications au sein de la transaction. |
server_transaction_id |
STRING |
Fournit une chaîne unique représentant la transaction dans laquelle la modification a été validée. Cette valeur ne doit être utilisée que dans le contexte du traitement des enregistrements de flux de modifications et n'est pas corrélée à l'ID de transaction dans l'API de Spanner. |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indique s'il s'agit du dernier enregistrement d'une transaction dans la partition actuelle. |
table_name |
STRING |
Indique le nom de la table concernée par la modification. |
value_capture_type |
STRING |
Décrit le type de capture de valeur spécifié dans la configuration du flux de modifications lorsque cette modification a été capturée. Le type de capture de valeur peut être l'un des suivants :
Par défaut, il s'agit de |
column_types |
[
{
"name": "STRING",
"type": {
"code": "STRING"
},
"is_primary_key": BOOLEAN
"ordinal_position": NUMBER
},
...
] |
Indique le nom de la colonne, son type, si elle est une clé primaire et sa position telle que définie dans le schéma (ordinal_position). La première colonne d'une table du schéma aurait une position ordinale de 1. Le type de colonne peut être imbriqué pour les colonnes de tableau. Le format correspond à la structure de type décrite dans la documentation de référence de l'API Spanner.
|
mods |
[
{
"keys": {"STRING" : "STRING"},
"new_values": {
"STRING" : "VALUE-TYPE",
[...]
},
"old_values": {
"STRING" : "VALUE-TYPE",
[...]
},
},
[...]
]
|
Décrit les modifications apportées, y compris les valeurs de clé primaire, les anciennes valeurs et les nouvelles valeurs des colonnes modifiées ou suivies. La disponibilité et le contenu des anciennes et nouvelles valeurs dépendent de la value_capture_type configurée. Les champs new_values et old_values ne contiennent que les colonnes non clés.
|
mod_type |
STRING |
Décrit le type de modification. Spécifiez l'un des contrôles suivants : INSERT, UPDATE ou DELETE. |
number_of_records_in_transaction |
INT64 |
Indique le nombre d'enregistrements de modifications de données faisant partie de cette transaction dans toutes les partitions de flux de modifications. |
number_of_partitions_in_transaction |
NUMBER |
Indique le nombre de partitions qui renvoient des enregistrements de modification de données pour cette transaction. |
transaction_tag |
STRING |
Indique le tag de transaction associé à cette transaction. |
is_system_transaction |
BOOLEAN |
Indique si la transaction est une transaction système. |
Exemple d'enregistrement de modification des données
Vous trouverez ci-dessous deux exemples d'enregistrements de modifications de données. Elles décrivent une seule transaction dans laquelle un transfert est effectué entre deux comptes. Les deux comptes se trouvent dans des partitions de flux de modifications distinctes.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
L'enregistrement de modification de données suivant est un exemple d'enregistrement avec le type de capture de valeur NEW_VALUES. Notez que seules les nouvelles valeurs sont renseignées.
Seule la colonne LastUpdate a été modifiée. C'est donc la seule qui a été renvoyée.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
L'enregistrement de modification de données suivant est un exemple d'enregistrement avec le type de capture de valeur NEW_ROW. Seule la colonne LastUpdate a été modifiée, mais toutes les colonnes suivies sont renvoyées.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
L'enregistrement de modification de données suivant est un exemple d'enregistrement avec le type de capture de valeur NEW_ROW_AND_OLD_VALUES. Seule la colonne LastUpdate a été modifiée, mais toutes les colonnes suivies sont renvoyées. Ce type de capture de valeur capture la nouvelle valeur et l'ancienne valeur de LastUpdate.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Enregistrements de pulsations
Lorsqu'un enregistrement de signal de présence est renvoyé, cela indique que toutes les modifications dont le commit_timestamp est inférieur ou égal au timestamp de l'enregistrement de signal de présence ont été renvoyées. Les futurs enregistrements de données de cette partition doivent avoir des codes temporels de commit supérieurs à ceux renvoyés par l'enregistrement de signal de présence. Les enregistrements de pulsations sont renvoyés lorsqu'aucune modification de données n'est écrite dans une partition. Lorsque des modifications de données sont écrites dans la partition, data_change_record.commit_timestamp peut être utilisé à la place de heartbeat_record.timestamp pour indiquer que le lecteur progresse dans la lecture de la partition.
Vous pouvez utiliser les enregistrements de pulsation renvoyés sur les partitions pour synchroniser les lecteurs sur toutes les partitions. Une fois que tous les lecteurs ont reçu un signal de présence supérieur ou égal à un certain code temporel A, ou ont reçu des données ou des enregistrements de partition enfant supérieurs ou égaux au code temporel A, ils savent qu'ils ont reçu tous les enregistrements validés à ce code temporel A ou avant. Ils peuvent alors commencer à traiter les enregistrements mis en mémoire tampon, par exemple en triant les enregistrements inter-partitions par code temporel et en les regroupant par server_transaction_id.
Un enregistrement de signal de présence ne contient qu'un seul champ :
GoogleSQL
| Champ | Type | Description |
|---|---|---|
timestamp |
TIMESTAMP |
Indique le code temporel de l'enregistrement de la pulsation. |
PostgreSQL
| Champ | Type | Description |
|---|---|---|
timestamp |
STRING |
Indique le code temporel de l'enregistrement de la pulsation. |
Exemple d'enregistrement de pulsation
Exemple d'enregistrement de signal de présence indiquant que tous les enregistrements dont le code temporel est inférieur ou égal à celui de cet enregistrement ont été renvoyés :
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Enregistrements de partitions enfants
Les enregistrements de partitions enfants renvoient des informations sur les partitions enfants : leurs jetons de partition, les jetons de leurs partitions parentes et le start_timestamp qui représente le code temporel le plus ancien pour lequel les partitions enfants contiennent des enregistrements de modifications. Les enregistrements dont les codes temporels de validation sont immédiatement antérieurs à child_partitions_record.start_timestamp sont renvoyés dans la partition actuelle. Après avoir renvoyé tous les enregistrements de partition enfant pour cette partition, cette requête renvoie un état de réussite, indiquant que tous les enregistrements ont été renvoyés pour cette partition.
Les champs d'un enregistrement de partition enfant incluent les éléments suivants :
GoogleSQL
| Champ | Type | Description |
|---|---|---|
start_timestamp |
TIMESTAMP |
Indique que les enregistrements de modification des données renvoyés à partir des partitions enfants dans cet enregistrement de partition enfant ont un code temporel de commit supérieur ou égal à start_timestamp. Lorsque vous interrogez une partition enfant, la requête doit spécifier le jeton de partition enfant et un start_timestamp supérieur ou égal à child_partitions_token.start_timestamp. Tous les enregistrements de partitions enfants renvoyés par une partition ont le même start_timestamp, et le code temporel se situe toujours entre les start_timestamp et end_timestamp spécifiés dans la requête. |
record_sequence |
STRING |
Indique un numéro de séquence croissant de façon monotone qui peut être utilisé pour définir l'ordre des enregistrements de partition enfant lorsqu'il existe plusieurs enregistrements de partition enfant renvoyés avec le même start_timestamp dans une partition spécifique. Le jeton de partition, start_timestamp et record_sequence, identifie de manière unique un enregistrement de partition enfant.
|
child_partitions |
[
{
"token" : "STRING",
"parent_partition_tokens" : ["STRING"]
}
] |
Renvoie un ensemble de partitions enfants et les informations associées. Cela inclut la chaîne de jetons de partition utilisée pour identifier la partition enfant dans les requêtes, ainsi que les jetons de ses partitions parentes. |
PostgreSQL
| Champ | Type | Description |
|---|---|---|
start_timestamp |
STRING |
Indique que les enregistrements de modification des données renvoyés à partir des partitions enfants dans cet enregistrement de partition enfant ont un code temporel de commit supérieur ou égal à start_timestamp. Lorsque vous interrogez une partition enfant, la requête doit spécifier le jeton de partition enfant et un start_timestamp supérieur ou égal à child_partitions_token.start_timestamp. Tous les enregistrements de partitions enfants renvoyés par une partition ont le même start_timestamp, et le code temporel se situe toujours entre les start_timestamp et end_timestamp spécifiés dans la requête.
|
record_sequence |
STRING |
Indique un numéro de séquence croissant de façon monotone qui peut être utilisé pour définir l'ordre des enregistrements de partition enfant lorsqu'il existe plusieurs enregistrements de partition enfant renvoyés avec le même start_timestamp dans une partition spécifique. Le jeton de partition, start_timestamp et record_sequence, identifie de manière unique un enregistrement de partition enfant.
|
child_partitions |
[
{
"token": "STRING",
"parent_partition_tokens": ["STRING"],
}, [...]
] |
Renvoie un tableau de partitions enfants et les informations associées. Cela inclut la chaîne de jetons de partition utilisée pour identifier la partition enfant dans les requêtes, ainsi que les jetons de ses partitions parentes. |
Exemple d'enregistrement de partition enfant
Voici un exemple d'enregistrement de partition enfant :
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Workflow de requête des flux de modifications
Exécutez des requêtes de flux de modifications à l'aide de l'API ExecuteStreamingSql, avec une transaction en lecture seule à usage unique et une limite d'horodatage forte. La fonction de lecture du flux de modifications vous permet de spécifier start_timestamp et end_timestamp pour la période qui vous intéresse. Tous les enregistrements de modifications inclus dans la période de conservation sont accessibles à l'aide de la limite temporelle de lecture seule forte.
Tous les autres TransactionOptions ne sont pas valides pour les requêtes de flux de modifications. De plus, si TransactionOptions.read_only.return_read_timestamp est défini sur true, une valeur spéciale de kint64max - 1 est renvoyée dans le message Transaction qui décrit la transaction, au lieu d'un code temporel de lecture valide. Cette valeur spéciale doit être ignorée et ne doit pas être utilisée pour les requêtes ultérieures.
Chaque requête de flux de modifications peut renvoyer un nombre quelconque de lignes, chacune contenant un enregistrement de modification de données, un enregistrement de signal de présence ou un enregistrement de partitions enfants. Il n'est pas nécessaire de définir un délai pour la demande.
Exemple de workflow de requête de flux de modifications
Le workflow de requête de flux commence par l'émission de la toute première requête de flux de modifications en spécifiant partition_token sur NULL. La requête doit spécifier la fonction de lecture du flux de modifications, les codes temporels de début et de fin qui vous intéressent, ainsi que l'intervalle de signal de présence. Lorsque end_timestamp est défini sur NULL, la requête continue de renvoyer les modifications de données jusqu'à la fin de la partition.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Traitez les enregistrements de données de cette requête jusqu'à ce que tous les enregistrements de partition enfant soient renvoyés. Dans l'exemple suivant, deux enregistrements de partition enfant et trois jetons de partition sont renvoyés, puis la requête se termine. Les enregistrements de partition enfant d'une requête spécifique partagent toujours le même start_timestamp.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012390",
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Pour traiter les modifications après 2022-05-01T09:00:01Z, créez trois requêtes et exécutez-les en parallèle. Utilisées ensemble, les trois requêtes renvoient les modifications de données pour la même plage de clés que celle couverte par leur parent. Définissez toujours start_timestamp sur start_timestamp dans le même enregistrement de partition enfant et utilisez le même end_timestamp et le même intervalle de signal de présence pour traiter les enregistrements de manière cohérente dans toutes les requêtes.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
La requête sur child_token_2 se termine après avoir renvoyé un autre enregistrement de partition enfant. Cet enregistrement indique qu'une nouvelle partition couvre les modifications pour child_token_2 et child_token_3 à partir du 2022-05-01T09:30:15Z. La requête sur child_token_3 renvoie exactement le même enregistrement, car les deux sont les partitions parentes du nouveau child_token_4. Pour garantir un traitement strict et ordonné des enregistrements de données pour une clé spécifique, la requête sur child_token_4 doit commencer une fois que tous les parents ont terminé. Dans ce cas, les parents sont child_token_2 et child_token_3. Ne créez qu'une seule requête pour chaque jeton de partition enfant. La conception du workflow de requête doit désigner un parent pour attendre et planifier la requête sur child_token_4.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": ["child_token_2", "child_token_3"],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Vous trouverez des exemples de gestion et d'analyse des enregistrements de flux de modifications dans le connecteur Dataflow SpannerIO Apache Beam sur GitHub.