Echtzeitdaten mit Änderungsstreams lesen
Mit Änderungsstreams für Firestore mit MongoDB-Kompatibilität können Anwendungen in Echtzeit auf Änderungen (Einfügungen, Aktualisierungen und Löschungen) zugreifen, die an einer Sammlung oder einer gesamten Datenbank vorgenommen wurden. In einem Änderungsstream werden Updates nach Änderungszeit sortiert.
Änderungsstreams sind über die MongoDB-kompatiblen APIs und herkömmlichen MongoDB-Treiber zugänglich. Die Firestore-Implementierung von Change Streams mit MongoDB-Kompatibilität kann jeden Durchsatz von Schreib- und Lesevorgängen durch eine einzigartige Implementierung der automatischen Partitionierung bei Schreibvorgängen und der Leseparallelität verarbeiten. So können Sie Arbeitslasten mit hohem Durchsatz erstellen. Sie können auch die Infrastruktur für die Migration und Datensynchronisierung zwischen Firestore und anderen Speicherlösungen verbessern.
Zusätzlich zur Kompatibilität mit den MongoDB-Treibern können Sie mit Firestore Change Streams parallel lesen. So können Sie parallele Arbeitslasten mit hohem Durchsatz erstellen. Jeder Stream stellt eine gut verteilte Partition der Ergebnisse dar.
Change Streams unterstützen die folgenden Funktionen:
- Konfigurierbare Änderungsstreams mit Datenbank- oder Sammlungsumfang.
- Eine Aufbewahrungsdauer für einen Änderungsstream, die bei der Erstellung angegeben wird. Die Standardaufbewahrungsdauer beträgt 7 Tage und die Mindestaufbewahrungsdauer 1 Tag. Die Aufbewahrungsdauer muss ein Vielfaches von 1 Tag sein, bis zu einem Maximum von 7 Tagen. Die Aufbewahrungsdauer kann nach dem Erstellen nicht mehr geändert werden. Wenn Sie den Aufbewahrungszeitraum ändern möchten, müssen Sie den Änderungsstream löschen und neu erstellen.
delete,insert,updateunddropsind Ereignisse, die sich mitdb.collection.watch()unddb.watch()beobachten lassen.updateDescription.updatedFieldsenthält Update-Differenzen.- Alle
fullDocument- undfullDocumentBeforeChange-Optionen.- Das vollständige Dokument wird nach Updates durchsucht.
- Das ursprüngliche Bild des Dokuments, bevor es ersetzt, aktualisiert oder gelöscht wurde.
- Bild des Dokuments nach dem Ersetzen oder Aktualisieren.
- Für Vorher- und Nachher-Bilder, die älter als eine Stunde sind, muss die Wiederherstellung zu einem bestimmten Zeitpunkt (Point-In-Time Recovery, PITR) aktiviert sein.
- Alle Optionen zum Fortsetzen, einschließlich
resumeAfterundstartAfter. - Wenn Sie
watch()verwenden, um Änderungen zu beobachten, können Sie Aggregationsphasen wie$addFields,$match,$project,$replaceRoot,$replaceWith,$setund$unsetverketten.
Änderungsstreams konfigurieren
Verwenden Sie dieGoogle Cloud Console, um Change Streams für eine Datenbank zu erstellen, zu löschen oder anzusehen.
Rollen und Berechtigungen
Zum Erstellen, Löschen und Auflisten von Änderungsstreams benötigt ein Prinzipal die IAM-Berechtigungen (Identity and Access Management) datastore.schemas.create, datastore.schemas.delete und datastore.schemas.list.
Die Rolle Datastore-Index-Administrator (roles/datastore.indexAdmin) gewährt beispielsweise diese Berechtigungen.
Änderungsstream erstellen
Bevor Sie einen entsprechenden Änderungsstream-Cursor öffnen können, müssen Sie einen Änderungsstream erstellen. Die automatische Aktivierung von Änderungsstreams beim Erstellen von Sammlungen oder Datenbanken wird nicht unterstützt.
Verwenden Sie die Google Cloud -Console, um einen Änderungsstream zu erstellen.
-
Rufen Sie in der Google Cloud Console die Seite Datenbanken auf.
- Wählen Sie in der Liste eine Firestore-Datenbank mit MongoDB-Kompatibilität aus. Der Bereich Firestore Studio wird geöffnet.
- Suchen Sie im Bereich Explorer nach dem Knoten Änderungsstreams, klicken Sie auf Weitere Aktionen und wählen Sie dann Änderungsstream erstellen aus.
- Geben Sie einen eindeutigen Namen, Bereich und Aufbewahrungszeitraum für den Änderungsstream ein und klicken Sie auf Speichern.
Änderungsstreams ansehen
Details zu Änderungsstreams finden Sie in der Google Cloud -Konsole.
-
Rufen Sie in der Google Cloud Console die Seite Datenbanken auf.
- Wählen Sie in der Liste eine Firestore-Datenbank mit MongoDB-Kompatibilität aus. Der Bereich Firestore Studio wird geöffnet.
- Suchen Sie im Bereich Explorer nach dem Knoten Change Streams (Änderungsstreams).
- Klicken Sie auf Knoten ein-/ausblenden, um den Knoten zu öffnen oder zu schließen.
Einen Änderungsstream löschen
Verwenden Sie die Google Cloud Console, um einen Änderungsstream zu löschen.
-
Rufen Sie in der Google Cloud Console die Seite Datenbanken auf.
- Wählen Sie in der Liste eine Firestore-Datenbank mit MongoDB-Kompatibilität aus. Der Bereich Firestore Studio wird geöffnet.
- Suchen Sie im Bereich Explorer nach dem Knoten Change Streams (Änderungsstreams).
- Klicken Sie auf Knoten ein-/ausblenden, um den Knoten zu öffnen oder zu schließen.
- Suchen Sie im Explorer nach dem Änderungsstream, den Sie löschen möchten.
- Klicken Sie auf Weitere Aktionen und wählen Sie dann Änderungsstream löschen aus.
- Geben Sie im Dialogfeld den Namen des Änderungsstreams ein, um das Löschen zu bestätigen, und klicken Sie dann auf Löschen.
Änderungsstream-Cursor öffnen oder fortsetzen
Die folgenden Beispiele zeigen, wie Sie einen Änderungsstream-Cursor erstellen, fortsetzen und konfigurieren.
Bevor Sie einen Änderungsstream-Cursor erstellen können, müssen Sie explizit einen Änderungsstream für die Datenbank oder Sammlung erstellen.
Cursor für Änderungsstream erstellen
Verwenden Sie die Methode watch in den MongoDB-Treibern, um einen neuen Änderungsstream-Cursor zu erstellen.
Wenn Sie alle Änderungen an einer Datenbank erfassen möchten, erstellen Sie einen Änderungsstream mit Datenbankbereich und rufen Sie die Methode watch für das Objekt db auf.
let cursor = db.watch()
Wenn Sie einen Cursor erstellen möchten, der auf eine Sammlung beschränkt ist, müssen Sie zuerst einen Änderungsstream für diese Sammlung erstellen. Rufen Sie dann die Methode watch für die entsprechende Sammlung auf.
let cursor = db.my_collection.watch()
Nachdem Sie einen Änderungsstream-Cursor erstellt haben, können Sie mit dem Streamen beginnen.
Wenn Sie beispielsweise ein Dokument einfügen und tryNext für den Cursor aufrufen, wird die Änderung im Änderungsstream angezeigt.
let doc = db.my_collection.insertOne({value: "hello world"}) console.log(cursor.tryNext())
Wenn Sie das Dokument aktualisieren und löschen, werden diese Änderungen im Änderungsstream angezeigt:
db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}}) db.my_collection.deleteOne({"_id": doc.insertedId}}) // Prints the update event console.log(cursor.tryNext()) // Prints the delete event console.log(cursor.tryNext())
Änderungsstream fortsetzen
Verwenden Sie die Optionen resumeAfter oder startAfter, um einen Änderungsstream fortzusetzen.
Verwenden Sie ein Fortsetzungstoken, um zu bestimmen, wo im Änderungslog die Fortsetzung erfolgen soll.resumeAfterstartAfter
// Create a cursor and add one event to the change stream. let cursor = db.my_collection.watch(); db.my_collection.insertOne({value: "hello world"}); let event = cursor.tryNext(); // Get the resume token from the event. let resumeToken = event._id; // Add a new event to the change stream. db.my_collection.insertOne({value: "foobar"}); // Create a new cursor by using the resume token as a starting point. let newCursor = db.my_collection.watch({resumeAfter: resumeToken}) // Log the change event containing the "foobar" value. console.log(newCursor.tryNext())
Mithilfe von startAfter:
// Start after the resume token. let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})
Vorher- und Nachher-Bilder in Aktualisierungen und Löschvorgänge einbeziehen
Bei Bedarf können Sie Vorher- und Nachher-Bilder von Dokumenten in Änderungsereignisse für Aktualisierungen und Löschvorgänge einfügen. Die Verfügbarkeit von Bildern unterliegt dem Zeitfenster für die Wiederherstellung zu einem bestimmten Zeitpunkt. Wenn Sie Dokumentbilder lesen möchten, die älter als eine Stunde sind, müssen Sie die Wiederherstellung zu einem bestimmten Zeitpunkt aktivieren.
Änderungsstreams nutzen das PITR-Fenster, um eine Ansicht des Dokuments vor und nach dem angegebenen Änderungsereignis bereitzustellen. Standardmäßig enthalten Update-Ereignisse das Feld updateDescription, das die Differenz der Felder angibt, die durch den Update-Vorgang geändert wurden.
Wenn Sie die Bilder vor und nach der Änderung in ein Änderungsereignis einfügen möchten, müssen Sie die Optionen fullDocumentBeforeChange und fullDocument in der Änderungsstream-Abfrage angeben.
let cursor = db.my_collection.watch({ "fullDocument": "required", "fullDocumentBeforeChange": "required" })
Wenn mit der Abfrage versucht wird, ein Dokument außerhalb des PITR-Aufbewahrungszeitraums zu lesen, oder wenn PITR nicht aktiviert ist, wird für den required-Wert eine serverseitige Fehlermeldung ausgegeben.
Alternativ zum Auslösen eines Fehlers können Sie den Wert whenAvailable verwenden, um den Wert null zurückzugeben, wenn die Bilder nicht mehr verfügbar sind.
let cursor = db.my_collection.watch({ "fullDocument": "whenAvailable", "fullDocumentBeforeChange": "whenAvailable" })
Aktuelles Bild in Updates einbeziehen
Standardmäßig enthalten Update-Ereignisse das Feld updateDescription, das das Delta der durch den Updatevorgang geänderten Felder ist. Wenn Sie stattdessen die aktuellste Version des gesamten Dokuments abrufen möchten, verwenden Sie den Wert updateLookup in der Option fullDocument.
Für diese Funktion ist kein PITR erforderlich. Es wird eine Suche nach dem Dokument durchgeführt.
let cursor = db.my_collection.watch({ "fullDocument": "updateLookup", })
Parallele Lesevorgänge
Um den Durchsatz zu erhöhen, können Sie mit der Option firestoreWorkerConfig eine Change Stream-Abfrage auf mehrere Worker aufteilen. Jeder Worker ist für die Bereitstellung der Änderungen für eine bestimmte Gruppe von Dokumenten verantwortlich. Sie müssen einen parallelen Cursor über eine runCommand- oder aggregate-Abfrage erstellen.
Sie können beispielsweise einen Änderungsstream so auf drei Worker verteilen:
let cursor1 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }} }]); let cursor2 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }} }]); let cursor3 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }} }]);
Änderungsstreams und Sicherungen
Weder die Konfiguration des Änderungsstreams noch die Daten des Änderungsstreams sind bei der Wiederherstellung von Sicherungen verfügbar. Wenn Sie eine Datenbank mit Change Streams wiederherstellen, müssen Sie diese Change Streams in der Zieldatenbank neu erstellen, um Cursor für diese Datenbank zu öffnen.
Abrechnung
- Für Änderungsstreams fallen Lese- und Speicherkosten an. Weitere Informationen
- Wenn Sie Vorher- und Nachher-Bilder einbeziehen möchten, die älter als 1 Stunde zum Zeitpunkt der Leseanfrage sind, müssen Sie PITR aktivieren. Dadurch entstehen PITR-Kosten.
Verhaltensunterschiede
Im folgenden Abschnitt werden die Unterschiede bei Change Streams zwischen Firestore mit MongoDB-Kompatibilität und MongoDB beschrieben.
updateDescription
updateDescription ist ein Dokument in einem update-Ereignis, in dem die Felder beschrieben werden, die durch den Aktualisierungsvorgang aktualisiert oder entfernt wurden. In Firestore sind die wichtigsten Unterschiede:
- In
updateDescriptionwerden die FeldertruncatedArraysunddisambiguatedPathsnicht ausgefüllt. updateDescription.updatedFieldsstellen einen kanonischen Unterschied zwischen den Vorher- und Nachher-Bildern eines Dokuments vor und nach einer Mutation dar.
Stellen Sie sich den folgenden Ausgangszustand eines Dokuments vor:
db.my_collection.insertOne({ _id: 1, root: { array: [{a: 1}, {b: 2}, {c: 3}] } })
Szenario 1: Nur das erste Element des Arrays ändern.
In diesem Szenario entspricht das Firestore-Verhalten dem von MongoDB.
db.my_collection.updateOne( {_id: 1}, {'$set': {"root.array.0.a": 100}} ) { updatedFields: {"root.array.0.a": 100}, removedFields: [] }
Szenario 2: Mit einem ganzen Array überschreiben
In diesem Szenario wird nur das erste Arrayfeld aktualisiert, das gesamte Array wird jedoch überschrieben.
Der Firestore-Aktualisierungsunterschied unterscheidet nicht zwischen diesen beiden Szenarien und gibt für beide denselben updateDescription.updatedFields zurück:
db.my_collection.updateOne( {_id: 1}, {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}} ) // In other implementations, updatedFields reflects the mutation itself { updatedFields: { "root.array": [{a: 100}, {b: 2}, {c: 3}] }, removedFields: [] } // Firestore updatedFields is the diff between the before and after versions of the document { updatedFields: {"root.array.0.a": 100}, removedFields: [] }